package day03.sink;

import beans.SensorReading;
import day02.transform.FlinkTransform00;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * Flink 流处理 API - sink
 * <p>
 * 将数据输出到 Elasticsearch 中
 *
 * @author lvbingbing
 * @date 2021-12-20 08:47
 */
public class FlinkSink02 {
    public static void main(String[] args) throws Exception {
        // 1、创建FlinkTransform00对象，有参构造会初始化 env，并从文件中读取数据
        int parallelism = 1;
        FlinkTransform00 flinkTransform = new FlinkTransform00(parallelism);
        // 2、获取执行环境
        StreamExecutionEnvironment env = flinkTransform.getEnv();
        // 3、学习 将数据输出到 Elasticsearch 中
        studyWriteToElasticsearch(flinkTransform.getSensorReadingStream());
        // 4、触发程序执行
        env.execute();
    }

    /**
     * 将数据输出到 Elasticsearch 中
     *
     * @param sensorReadingStream <br>
     */
    private static void studyWriteToElasticsearch(DataStream<SensorReading> sensorReadingStream) {
        List<HttpHost> httpHosts = new ArrayList<>();
        httpHosts.add(new HttpHost("127.0.0.1", 9200));
        sensorReadingStream.addSink(new ElasticsearchSink.Builder<>(httpHosts, new MyElasticsearchSinkFunction()).build());
    }

    /**
     * 实现自定义的ES写入操作
     */
    static class MyElasticsearchSinkFunction implements ElasticsearchSinkFunction<SensorReading> {

        private static final long serialVersionUID = 770136489057713067L;

        @Override
        public void process(SensorReading sensorReading, RuntimeContext runtimeContext, RequestIndexer requestIndexer) {
            Map<String, String> map = new HashMap<>();
            map.put("id", sensorReading.getId());
            map.put("temp", sensorReading.getTemperature().toString());
            map.put("ts", sensorReading.getTimestamp().toString());

            // 创建请求，作为向es发起的写入命令
            IndexRequest indexRequest = Requests.indexRequest()
                    .index("sensor")
                    .type("readingdata")
                    .source(map);

            // 用 index 发送请求
            requestIndexer.add(indexRequest);
        }
    }
}
